 1.Kafka源码剖析之Producer生产者流程中Producer示例
   
   首先我们先通过一段代码来展示 KafkaProducer 的使用方法。在下面的示例中，我们使用KafkaProducer 实现
向kafka发送消息的功能。在示例程序中，首先将 KafkaProduce 使用的配置写入到 Properties 中，每项配置的具体
含义在注释中进行解释。之后以此 Properties 对象为参数构造 KafkaProducer 对象,最后通过 send 方法完成发
送，代码中包含同步发送、异步发送两种情况。
    public static void main(String[] args) throws ExecutionException,
            InterruptedException {
        Properties props = new Properties();
// 客户端id
        props.put("client.id", "KafkaProducerDemo");
// kafka地址,列表格式为host1:port1,host2:port2,…，无需添加所有的集群地址，kafka会根据
//        提供的地址发现其他的地址（建议多提供几个，以防提供的服务器关闭）
        props.put("bootstrap.servers", "localhost:9092");
// 发送返回应答方式
// 0:Producer 往集群发送数据不需要等到集群的返回，不确保消息发送成功。安全性最低但是效率最高
// 1:Producer 往集群发送数据只要 Leader 应答就可以发送下一条，只确保Leader接收成功。
// -1或者all：Producer 往集群发送数据需要所有的ISR Follower都完成从Leader的同步才会发
//        送下一条，确保Leader发送成功和所有的副本都成功接收。安全性最高，但是效率最低。
        props.put("acks", "all");
// 重试次数
        props.put("retries", 0);
// 重试间隔时间
        props.put("retries.backoff.ms", 100);
// 批量发送的大小
        props.put("batch.size", 16384);
// 一个Batch被创建之后，最多过多久，不管这个Batch有没有写满，都必须发送出去
        props.put("linger.ms", 10);
// 缓冲区大小
        props.put("buffer.memory", 33554432);
// key序列化方式
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
// value序列化方式
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
// topic
        String topic = "lagou_edu";
        Producer<String, String> producer = new KafkaProducer<>(props);
        AtomicInteger count = new AtomicInteger();
        while (true) {
            int num = count.get();
            String key = Integer.toString(num);
            String value = Integer.toString(num);
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key,
                    value);
            if (num % 2 == 0) {
// 偶数异步发送
// 第一个参数record封装了topic、key、value
// 第一个参数是一个callback对象，当生产者接收到kafka发来的ACK确认消息时，会调用
// 此CallBack对象的onComplete⽅法
                producer.send(record, (recordMetadata, e) -> {
                    System.out.println("num:" + num + " topic:" +
                            recordMetadata.topic() + " offset:" + recordMetadata.offset());
                });
            } else {
// 同步发送
// KafkaProducer.send方法返回的类型是Future<RecordMetadata>，通过get⽅法阻
//                塞当前线程，等待kafka服务端ACK响应
                producer.send(record).get();
            }
            count.incrementAndGet();
            TimeUnit.MILLISECONDS.sleep(100);
        }
    }
   1).同步发送
	   (1).KafkaProducer.send方法返回的类型是Future<RecordMetadata>，通过get方法阻塞当前线程，等待kafka
服务端ACK响应
       producer.send(record).get() 
   2).异步发送
	   (1).第一个参数record封装了topic、key、value
	   (2).第二个参数是一个callback对象，当⽣产者接收到kafka发来的ACK确认消息时，会调用此CallBack对象的
onComplete方法
    producer.send(record, (recordMetadata, e) -> {
       System.out.println("num:" + num + " topic:" +
            recordMetadata.topic() + " offset:" + recordMetadata.offset());
    });
 2.KafkaProducer实例化 
   
   了解了 KafkaProducer 的基本使用，开始深入了解的KafkaProducer原理和实现，先看一下构造方法核心逻辑
      private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer,
                          Serializer<V> valueSerializer) {
        try {
// 获取用户的配置
            Map<String, Object> userProvidedConfigs = config.originals();
            this.producerConfig = config;
// 系统时间
            this.time = Time.SYSTEM;
// 获取client.id配置
            String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
// 如果client.id为空，设置默认值:producer-1
            if (clientId.length() <= 0)
                clientId = "producer-" +
                        PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
            this.clientId = clientId;
// 获取事务id,如果没有配置则为null
            String transactionalId =
                    userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
                            (String)
                                    userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
            LogContext logContext;
            if (transactionalId == null)
                logContext = new LogContext(String.format("[Producer clientId=%s] ",
                        clientId));
            else
                logContext = new LogContext(String.format("[Producer clientId=%s,
                        transactionalId=%s] ", clientId, transactionalId));
            log = logContext.logger(KafkaProducer.class);
            log.trace("Starting the Kafka producer");
// 创建client-id的监控map
            Map<String, String> metricTags = Collections.singletonMap("client-id",
                    clientId);
// 设置监控配置，包含样本量、取样时间窗口、记录级别
            MetricConfig metricConfig = new
                    MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                            TimeUnit.MILLISECONDS)
                    .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_R
                            ECORDING_LEVEL_CONFIG)))
                    .tags(metricTags);
// 监控数据上报类
            List<MetricsReporter> reporters =
                    config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                            MetricsReporter.class);
            reporters.add(new JmxReporter(JMX_PREFIX));
            this.metrics = new Metrics(metricConfig, reporters, time);
// 生成生产者监控
            ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
// 分区类
            this.partitioner =
                    config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,
                            Partitioner.class);
// 重试时间 retry.backoff.ms 默认100ms
            long retryBackoffMs =
                    config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
            if (keySerializer == null) {
// 反射生成key序列化方式
                this.keySerializer =
                        ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONF
                                IG,
                                Serializer.class));
                this.keySerializer.configure(config.originals(), true);
            } else {
                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                this.keySerializer = ensureExtended(keySerializer);
            }
            if (valueSerializer == null) {
// 反射生成key序列化方式
                this.valueSerializer =
                        ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CO
                                NFIG,
                                Serializer.class));
                this.valueSerializer.configure(config.originals(), false);
            } else {
                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = ensureExtended(valueSerializer);
            }
// load interceptors and make sure they get clientId
// 确认client.id添加到用户的配置里面
            userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
// 获取多个拦截器,为空则不处理
            List<ProducerInterceptor<K, V>> interceptorList = (List) (new
                    ProducerConfig(userProvidedConfigs,
                    false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ProducerInterceptor.class);
            this.interceptors = interceptorList.isEmpty() ? null : new
                    ProducerInterceptors<>(interceptorList);
            // 集群资源监听器,在元数据变更时会有通知
            ClusterResourceListeners clusterResourceListeners =
                    configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList,
                            reporters);
// ⽣产者每隔⼀段时间都要去更新一下集群的元数据,默认5分钟
            this.metadata = new Metadata(retryBackoffMs,
                    config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                    true, true, clusterResourceListeners);
// 生产者往服务端发送消息的时候，规定一条消息最大多大？
// 如果你超过了这个规定消息的大小，你的消息就不能发送过去。
// 默认是1M，这个值偏小，在生产环境中，我们需要修改这个值。
// 经验值是10M。但是大家也可以根据自己公司的情况来。
            this.maxRequestSize =
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
//指的是缓存大小
//默认值是32M，这个值一般是够用，如果有特殊情况的时候，我们可以去修改这个值。
            this.totalMemorySize =
                    config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
// kafka是支持压缩数据的，可以设置压缩格式,默认是不压缩，支持gzip、snappy、lz4
// 一次发送出去的消息就更多。生产者这儿会消耗更多的cpu.
            this.compressionType =
                    CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
// 配置控制了KafkaProducer.send()并将KafkaProducer.partitionsFor()被阻塞多长
// 时间,由于缓冲区已满或元数据不可用，这些方法可能会被阻塞止
            this.maxBlockTimeMs =
                    config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
// 控制客户端等待请求响应的最长时间。如果在超时过去之前未收到响应，客户端将在必要时重
            新发送请求，或者如果重试耗尽，请求失败
            this.requestTimeoutMs =
                    config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
// 事务管理器
            this.transactionManager = configureTransactionState(config, logContext,
                    log);
// 重试次数
            int retries = configureRetries(config, transactionManager != null, log);
// 使用幂等性，需要将 enable.idempotence 配置项设置为true。并且它对单个分区的发
// 送，一次性最多发送5条
            int maxInflightRequests = configureInflightRequests(config,
                    transactionManager != null);
// 如果开启了幂等性，但是⽤户指定的ack不为 -1，则会抛出异常
            short acks = configureAcks(config, transactionManager != null, log);
            this.apiVersions = new ApiVersions();
// 创建核心组件：记录累加器
            this.accumulator = new RecordAccumulator(logContext,
                    config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    this.totalMemorySize,
                    this.compressionType,
                    config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                    retryBackoffMs,
                    metrics,
                    time,
                    apiVersions,
                    transactionManager);
// 获取broker地址列表
            List<InetSocketAddress> addresses =
                    ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVER
                            S_CONFIG));
// 更新元数据
            this.metadata.update(Cluster.bootstrap(addresses), Collections.
                    <String>emptySet(), time.milliseconds());
// 创建通道，是否需要加密
            ChannelBuilder channelBuilder =
                    ClientUtils.createChannelBuilder(config);
            Sensor throttleTimeSensor =
                    Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
// 初始化了一个重要的管理⽹路的组件
// connections.max.idle.ms: 默认值是9分钟, ⼀个⽹络连接最多空闲多久，超过这个空闲
// 时间，就关闭这个网络连接。
// max.in.flight.requests.per.connection：默认是5, producer向broker发送数据
// 的时候，其实是有多个网络连接。每个网络连接可以忍受 producer端发送给broker 消息然后消息没有响应的个
                    数
            NetworkClient client = new NetworkClient(
                    new
                            Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                            this.metrics, time, "producer", channelBuilder,
                            logContext),
                    this.metadata,
                    clientId,
                    maxInflightRequests,
                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                    config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                    this.requestTimeoutMs,
                    time,
                    true,
                    apiVersions,
                    throttleTimeSensor,
                    logContext);
// 发送线程
            this.sender = new Sender(logContext,
                    client,
                    this.metadata,
                    this.accumulator,
                    maxInflightRequests == 1,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    acks,
                    retries,
                    metricsRegistry.senderMetrics,
                    Time.SYSTEM,
                    this.requestTimeoutMs,
                    config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                    this.transactionManager,
                    apiVersions);
// 线程名称
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
// 启动守护线程
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();
            this.errors = this.metrics.sensor("errors");
// 把用户配置的参数，但是没有⽤到的打印出来
            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
            log.debug("Kafka producer started");
        } catch (Throwable t) {
// call close methods if internal objects are already constructed this
// is to prevent resource leak. see KAFKA-2121
            close(0, TimeUnit.MILLISECONDS, true);
// now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);
        }
    } 
 3.消息发送过程
   
   Kafka消息实际发送以 send 方法为入口：
   @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
    {
// intercept the record, which can be potentially modified; this method does
// not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record :
                this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }
   1).拦截器
   首先方法会先进入拦截器集合 ProducerInterceptors,onSend方法是遍历拦截器 onSend 方法，拦截器的目
的是将数据处理加工， kafka 本身并没有给出默认的拦截器的实现。如果需要使用拦截器功能，必须自己实现
ProducerInterceptor 接口。
   public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        ProducerRecord<K, V> interceptRecord = record;
// 遍历所有拦截器，顺序执行，如果有异常只打印日志，不会向上抛出
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
// do not propagate interceptor exception, log and continue calling
// other interceptors
// be careful not to throw exception from here
                if (record != null)
                    log.warn("Error executing interceptor onSend callback for topic:
                {}, partition: {}", record.topic(), record.partition(), e);
else
                log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }
    2).拦截器核心逻辑
	ProducerInterceptor 接口包括三个方法：
	 (1).onSend(ProducerRecord)：该方法封装进KafkaProducer.send方法中，即它运行在用户主线程中的。
Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作，但最好保
证不要修改消息所属的topic和分区，否则会影响目标分区的计算
	 (2).onAcknowledgement(RecordMetadata, Exception)：该方法会在消息被应答之前或消息发送失败时
调用，并且通常都是在producer回调逻辑触发之前。onAcknowledgement运⾏在producer的IO线程中，因此不
要在该方法中放入很重的逻辑，否则会拖慢producer的消息发送效率
	 (3).close：关闭interceptor，主要用于执行一些资源清理工作
	 (4).拦截器可能被运行在多个线程中，因此在具体实现时用户需要自行确保线程安全。另外倘若指定了
多个interceptor，则producer将按照指定顺序调用它们，并仅仅是捕获每个interceptor可能抛出的异常记
录到错误日志中而非在向上传递。
	3).发送五步骤
	下面仔细来看一下 doSend 方法的运行过程：
	private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback)
    {
// 首先创建一个主题分区类
        TopicPartition tp = null;
        try {
// first make sure the metadata for the topic is available
// 首先确保该topic的元数据可用
            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(),
                    record.partition(), maxBlockTimeMs);
            long remainingWaitMs = Math.max(0, maxBlockTimeMs -
                    clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
// 序列化 record 的 key 和 value
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(),
                        record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " +
                        record.key().getClass().getName() +
                        " to class " +
                        producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(),
                        record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " +
                        record.value().getClass().getName() +
                        " to class " +
                        producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }
// 获取该 record 要发送到的 partition
            int partition = partition(record, serializedKey, serializedValue,
                    cluster);
            tp = new TopicPartition(record.topic(), partition);
// 给header设置只读
            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();
            int serializedSize =
                    AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                            compressionType, serializedKey, serializedValue, headers);
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? time.milliseconds() :
                    record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}",
                    record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and
            interceptor callback
            Callback interceptCallback = this.interceptors == null ? callback : new
                    InterceptorCallback<>(callback, this.interceptors, tp);
            if (transactionManager != null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);
// 向 accumulator 中追加 record 数据，数据会先进⾏缓存
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp,
                    timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
// 如果追加完数据后，对应的 RecordBatch 已经达到了 batch.size 的大小（或者batch
// 的剩余空间不足以添加下⼀条 Record），则唤醒 sender 线程发送数据。
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either
                        full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
        } catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if (callback != null)
                callback.onCompletion(null, e);
            this.errors.record();
            if (this.interceptors != null)
                this.interceptors.onSendError(record, tp, e);
            return new FutureFailure(e);
        } catch (InterruptedException e) {
            this.errors.record();
            if (this.interceptors != null)
                this.interceptors.onSendError(record, tp, e);
            throw new InterruptException(e);
        } catch (BufferExhaustedException e) {
            this.errors.record();
            this.metrics.sensor("buffer-exhausted-records").record();
            if (this.interceptors != null)
                this.interceptors.onSendError(record, tp, e);
            throw e;
        } catch (KafkaException e) {
            this.errors.record();
            if (this.interceptors != null)
                this.interceptors.onSendError(record, tp, e);
            throw e;
        } catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called
// before anything else in this method
            if (this.interceptors != null)
                this.interceptors.onSendError(record, tp, e);
            throw e;
        }
    }
	 (1).Producer 通过 waitOnMetadata()方法来获取对应topic的metadata信息，需要先该topic是可用的
	 (2).Producer 端对record的key和value值进行序列化操作，在Consumer端再进行相应的反序列化
	 (3).获取partition值，具体分为下面三种情况：
	     (1).指明 partition 的情况下，直接将指明的值直接作为 partiton 值
		 (2).没有指明 partition 值但有key的情况下，将key的hash值与topic的partition数进行取余得到
partition值
		 (3).既没有partition值又没有key值的情况下,第一次调用时随机生成一个整数(后面每次调用
在这个整数上自增），将这个值与topic可用的 partition总数取余得到 partition值，也就是常说的
round-robin 算法
         (4).Producer 默认使用的 partitioner 是
		 org.apache.kafka.clients.producer.internals.DefaultPartitioner
	 (4).向accumulator写数据,先将record写入到 buffer中,当达到一个batch.size的大小时,再唤起sender
线程去发送 RecordBatch，这里仔细分析一下Producer是如何向buffer写入数据的
         (1).获取该 topic-partition 对应的 queue，没有的话会创建一个空的 queue
		 (2).向queue中追加数据,先获取queue中最新加入的那个RecordBatch，如果不存在或者存在但剩
余空余不足以添加本条 record 则返回 null，成功写入的话直接返回结果，写入成功
		 (3).创建一个新的RecordBatch,初始化内存大小根据 max(batch.size, Records.LOG_OVERHEAD +
Record.recordSize(key, value)) 来确定（防止单条 record 过大的情况）
		 (4).向新建的RecordBatch写入record，并将RecordBatch 添加到queue中，返回结果，写入成功
	 (5).发送 RecordBatch,当record写入成功后,如果发现RecordBatch已满足2发送的条件(通常是queue中
有多个batch,那么最先添加的那些 batch 肯定是可以发送了)，那么就会唤醒 sender 线程，发送
RecordBatch 。sender 线程对 RecordBatch 的处理是在 run() 方法中进行的，该方法具体实现如下：
         (1).获取那些已经可以发送的 RecordBatch 对应的 nodes
		 (2).如果与node没有连接(如果可以连接,同时初始化该连接),就证明该node暂时不能发送数据,暂
时移除该 node
		 (3).返回该node对应的所有可以发送的RecordBatch组成的 batches(key是node.id),并将
RecordBatch 从对应的 queue 中移除
		 (4).将由于元数据不可用而导致发送超时的 RecordBatch 移除
		 (5).
	4).MetaData更新机制
	 (1).metadata.requestUpdate()将metadata的needUpdate变量设置为true(强制更新),并返回当前的版
本号(version)，通过版本号来判断 metadata 是否完成更新
	 (2).sender.wakeup() 唤醒 sender 线程，sender 线程⼜会去唤醒NetworkClient线程去更新
	 (3).metadata.awaitUpdate(version, remainingWaitMs) 等待 metadata 的更新
	 (4).所以，每次 Producer 请求更新 metadata 时，会有以下几种情况:
	     (1).如果 node 可以发送请求，则直接发送请求
		 (2).如果该 node 正在建立连接，则直接返回
		 (3).如果该 node 还没建立连接，则向 broker 初始化链接
	 (5).NetworkClient的poll方法中判断是否需要更新meta数据,handleCompletedReceives处理metadata的
更新，最终是调用的DefaultMetadataUpdater 中的 handleCompletedMetadataResponse 方法处理